Python协程单线程异步IO实现并发

45次阅读
没有评论

共计 2242 个字符,预计需要花费 6 分钟才能阅读完成。

Python 中 单线程 的异步编程模型称为协程。一般情况下,当程序处于 IO 操作的时候,线程都会处于阻塞状态。线程是 CPU 控制的,而协程是程序自身控制的,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级。

协程:当程序遇见了 IO 操作的时候,可以选择性的切换到其他任务上。在单线程条件下,微观上协程是一个任务一个任务的进行切换,宏观上看是多个任务一起在执行(多任务异步操作)。

要用在异步 IO 编程中依赖的库必须支持异步 IO 特性。

asyncio

是 Python 3.4 之后引入的标准库,内置对异步 IO 的支持。其编程模型是一个消息循环,从 asyncio 模块中直接获取一个 EventLoop 的引用,然后把需要执行的协程扔到 EventLoop 中执行,就实现了异步 IO。

import asyncio  # 导入异步编程库
import time

async def func1():
    print(" 李小龙 ")
    await asyncio.sleep(5)
    print(" 李小龙 ")

async def func2():
    print(" 李大龙 ")
    await asyncio.sleep(3)
    print(" 李大龙 ")

async def main():
    tasks = [asyncio.create_task(func1()), asyncio.create_task(func2())]
    await asyncio.wait(tasks)

if __name__ == "__main__":
    t1 = time.time()
    asyncio.run(main())
    t2 = time.time()
    print(t2 - t1)

异步爬虫

爬虫中 requests 库不支持异步,需要用 aiohttp。

import asyncio
import aiohttp
import time

async def async_craw(url):
    print("craw url:", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f"craw url: {url}, {len(result)}")

urls = [f"https://news.cnblogs.com/n/page/{page}" for page in range(1, 10 + 1)]

loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url)) for url in urls]

start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds:", end - start)

Python 3.7 以上版本的写法:

import asyncio
import aiohttp
import time

urls = [f"https://news.cnblogs.com/n/page/{page}" for page in range(1, 10 + 1)]

async def async_craw(url):
    print("craw url:", url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            result = await resp.text()
            print(f"craw url: {url}, {len(result)}")

async def main():
    tasks = [asyncio.create_task(async_craw(url)) for url in urls]
    asyncio.wait(tasks)

if __name__ == "__main__":
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print("use time seconds:", end - start)

信号量 Semaphore

信号量又称为旗语,是一个计数器,用来设置并发度。

import asyncio
import aiohttp
import time

semaphore = asyncio.Semaphore(5)  # 信号量 5

async def async_craw(url):
    async with semaphore:
        print("craw url:", url)
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as resp:
                result = await resp.text()
                print(f"craw url: {url}, {len(result)}")

urls = [f"https://news.cnblogs.com/n/page/{page}" for page in range(1, 10 + 1)]

loop = asyncio.get_event_loop()
tasks = [loop.create_task(async_craw(url)) for url in urls]

start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds:", end - start)

正文完
 0
三毛笔记
版权声明:本站原创文章,由 三毛笔记 于2023-08-17发表,共计2242字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)